package com.uxsino.reactorq.processor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;

import reactor.core.publisher.Flux;
import reactor.core.publisher.TopicProcessor;

/**
 * @ClassName QueueProcessor
 * @Description TODO
 * @Author <a href="mailto:royrxc@gmail.com">Administrator</a>
 * @Daate 2019/6/18 16:32
 **/
public final class QueueProcessor<T> implements Runnable{
    private static final Logger LOG = LoggerFactory.getLogger(QueueProcessor.class);
    private BlockingQueue<T> queue = null;

    public static final int MIN_SIZE = 16;
    public static final int WAIT_TIME_MS = 5000;
    private int size = MIN_SIZE * 4;
    private Object locker = new Object();
    private boolean running = true;

    private Thread THREAD ;

    private String name;

    private TopicProcessor<T> processor;

    public static <T> QueueProcessor<T> of(String name, int poolSize){
        return new QueueProcessor<T>(name, poolSize);
    }

    public static <T> QueueProcessor<T> of(int poolSize){
        return new QueueProcessor<T>(null, poolSize);
    }

    public static <T> QueueProcessor<T> of(String name){
        return new QueueProcessor<T>(name, MIN_SIZE);
    }

    private QueueProcessor(String name, int poolSize){
        if(poolSize > 0){
            this.size = poolSize;
        }
        if(Strings.isNullOrEmpty(name)){
            name = "uxsino-pool";
        }

        this.processor = TopicProcessor.<T> builder().bufferSize(MIN_SIZE).autoCancel(false).name(name).build();
        this.name = name;
        queue = new LinkedBlockingQueue<>(this.size);
        THREAD = new Thread(this, this.name);
        THREAD.start();
    }

    @Override
    public void run() {
        while (running){

            try {
                if(this.queue.isEmpty()){
                    try {
                        synchronized (this.locker){
                            this.locker.wait(WAIT_TIME_MS);
                        }
                    }catch (Exception e){
                        LOG.warn("Thread {}, locker wait error: {}" , this.name, e);
                    }
                }else{
                    try {
                        T item = this.queue.poll();
                        processor.onNext(item);
                        synchronized (this.locker){
                            this.locker.notifyAll();
                        }
                    }catch (Exception e){
                        LOG.warn("Thread {}, error: {}" , this.name, e);
                    }
                }
            }catch (Exception e){
                LOG.warn("Thread {}, error: {}" , this.name, e);
            }
        }
    }

    public void subscribe(Consumer<? super T> consumer){
        this.processor.subscribe(t->{
            try {//避免消费失败，导致消费者被自动取消的情况
                consumer.accept(t);
            }catch (Exception e){
                LOG.warn("consumer error: {}", e);
            }
        });

        synchronized (this.locker){
            this.locker.notifyAll();
        }
    }

    public void doOnError(Consumer<? super Throwable> onError){
        if(onError != null){
            this.processor.doOnError(onError);
        }
    }

    public void doOnComplete(Runnable complete){
        if(complete != null){
            this.processor.doOnComplete(complete);
        }
    }

    public void next(T t){
        if(this.queue.remainingCapacity() <= 0){
            LOG.warn("queue {} full, total size {} watting .....", this.name, this.queue.size());
        }
        try {
            if(t == null){
                LOG.warn("QueueProcessor next error: value is null");
            }else{
                this.queue.put(t);
            }
        }catch (Exception e){
            LOG.warn("QueueProcessor next method error: {}", e);
        }
        synchronized (this.locker){
            this.locker.notifyAll();
        }
    }

    /**
     * 返回目前存在的数据数
     * @return
     */
    public int size(){
        return this.queue.size();
    }

    public void stop(){
        this.running = false;
    }

    public Flux<T> filter(Predicate<? super T> filter){
        return this.processor.filter(filter);
    }


    /*public static void main(String[] args) {
        QueueProcessor<Integer> pc = QueueProcessor.of(5);
        AtomicLong counterConsumer = new AtomicLong(0);
        AtomicLong counterProcessor = new AtomicLong(0);

        pc.subscribe((t)->{
            System.out.println("");
            System.out.println("2消费-->:: "+ t+ "    consumer: "+ counterConsumer.incrementAndGet()+ "    processor: " + counterProcessor.get());
        });

        pc.subscribe((t)->{
            System.out.println("");
            System.out.println("2消费-->:: "+ t+ "    consumer: "+ counterConsumer.incrementAndGet()+ "    processor: " + counterProcessor.get());
        });

        while (Boolean.TRUE.equals(true)){
            Scanner sc = new Scanner(System.in);
            System.out.print("please input more: ");
            Long counter = Longs.tryParse(sc.nextLine());
            if(counter != null){
                while (counter-- > 0){
                    System.out.println("put -->:: "+counter);
                    //counterProcessor.incrementAndGet();
                    pc.next(counter.intValue());
                }
            }
        }
    }*/
}
